# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is dual-licensed under either the MIT license found in the
# LICENSE-MIT file in the root directory of this source tree or the Apache
# License, Version 2.0 found in the LICENSE-APACHE file in the root directory
# of this source tree. You may select, at your option, one of the
# above-listed licenses.

# pyre-strict

import argparse
import cProfile
import json
import logging
import os
import pstats
import shlex
import sys
from pathlib import Path
from typing import Dict, List, Optional

from apple.tools.code_signing.apple_platform import ApplePlatform
from apple.tools.code_signing.codesign_bundle import (
    AdhocSigningContext,
    codesign_bundle,
    CodesignConfiguration,
    CodesignedPath,
    signing_context_with_profile_selection,
    write_empty_codesign_manifest,
)
from apple.tools.code_signing.list_codesign_identities import (
    AdHocListCodesignIdentities,
    ListCodesignIdentities,
)

from apple.tools.re_compatibility_utils.writable import make_dir_recursively_writable

from .action_metadata import action_metadata_if_present

from .assemble_bundle import assemble_bundle
from .assemble_bundle_types import BundleSpecItem, IncrementalContext
from .incremental_state import (
    IncrementalState,
    IncrementalStateItem,
    IncrementalStateJSONEncoder,
    parse_incremental_state,
)
from .incremental_utils import codesigned_on_copy_item
from .signing_context import (
    add_args_for_signing_context,
    signing_context_and_selected_identity_from_args,
)
from .swift_support import run_swift_stdlib_tool, SwiftSupportArguments


_METADATA_PATH_KEY = "ACTION_METADATA"


def _args_parser() -> argparse.ArgumentParser:
    parser = argparse.ArgumentParser(
        description="Tool which assembles the Apple bundle."
    )
    parser.add_argument(
        "--output",
        metavar="</path/to/app.bundle>",
        type=Path,
        required=True,
        help="Absolute path to Apple bundle result.",
    )
    parser.add_argument(
        "--spec",
        metavar="<Spec.json>",
        type=Path,
        required=True,
        help="Path to file with JSON representing the bundle contents. It should contain a dictionary which maps bundle relative destination paths to source paths.",
    )
    parser.add_argument(
        "--codesign-tool",
        metavar="<codesign>",
        type=Path,
        required=False,
        help="Path to code signing utility. If not provided standard `codesign` tool will be used.",
    )
    parser.add_argument(
        "--codesign-manifest",
        metavar="<manifest_file>",
        type=Path,
        required=False,
        help="Path to a file containing codesign invocations.",
    )
    parser.add_argument(
        "--codesign-args",
        type=str,
        default=[],
        required=False,
        action="append",
        help="Add additional args to pass during codesigning. Pass as`--codesign-args=ARG` to ensure correct arg parsing.",
    )
    parser.add_argument(
        "--codesign-configuration",
        required=False,
        type=CodesignConfiguration,
        choices=[e.value for e in CodesignConfiguration],
        help=f"""
            Augments how code signing is run.
            Pass `{CodesignConfiguration.fastAdhoc}` to skip adhoc signing bundles if the executables are already adhoc signed.
            Pass `{CodesignConfiguration.executionBypass}` to skip executing the actual codesigning commands.
            Pass `{CodesignConfiguration.dryRun}` for code signing to be run in dry mode (instead of actual signing only .plist
            files with signing parameters will be generated in the root of each signed bundle).
        """,
    )
    parser.add_argument(
        "--incremental-state",
        metavar="<IncrementalState.json>",
        type=Path,
        required=False,
        help="Required if script is run in incremental mode. Path to file with JSON which describes the contents of bundle built previously.",
    )
    parser.add_argument(
        "--profile-output",
        metavar="<ProfileOutput.txt>",
        type=Path,
        required=False,
        help="Path to the profiling output. If present profiling will be enabled.",
    )
    parser.add_argument(
        "--log-level-stderr",
        choices=["debug", "info", "warning", "error", "critical"],
        type=str,
        required=False,
        default="warning",
        help="Logging level for messages written to stderr.",
    )
    parser.add_argument(
        "--log-level-file",
        choices=["debug", "info", "warning", "error", "critical"],
        type=str,
        required=False,
        default="info",
        help="Logging level for messages written to a log file.",
    )
    parser.add_argument(
        "--binary-destination",
        metavar="<Binary>",
        type=Path,
        required=False,
        help="Required if swift support was requested. Bundle relative destination path to bundle binary.",
    )
    parser.add_argument(
        "--frameworks-destination",
        metavar="<Frameworks>",
        type=Path,
        required=False,
        help="Required if swift support was requested. Bundle relative destination path to frameworks directory.",
    )
    parser.add_argument(
        "--extensionkit-extensions-destination",
        metavar="<ExtensionKitExtensions>",
        type=Path,
        required=False,
        help="Required if swift support was requested. Bundle relative destination path to ExtensionKit Extensions directory.",
    )
    parser.add_argument(
        "--plugins-destination",
        metavar="<Plugins>",
        type=Path,
        required=False,
        help="Required if swift support was requested. Bundle relative destination path to plugins directory.",
    )
    parser.add_argument(
        "--appclips-destination",
        metavar="<AppClips>",
        type=Path,
        required=False,
        help="Required if swift support was requested. Bundle relative destination path to appclips directory.",
    )
    parser.add_argument(
        "--sdk-root",
        metavar="<path/to/SDK>",
        type=Path,
        required=False,
        help="Required if swift support was requested. Path to SDK root.",
    )
    parser.add_argument(
        "--swift-stdlib-command",
        metavar='<"/swift/stdlib/tool --foo bar/qux">',
        type=str,
        required=False,
        help="Swift stdlib command prefix. If present, output bundle will contain needed Swift standard libraries (to support the lack of ABI stability or certain backports usage).",
    )
    parser.add_argument(
        "--check-conflicts",
        action="store_true",
        help="Check there are no path conflicts between different source parts of the bundle if enabled.",
    )
    parser.add_argument(
        "--versioned-if-macos",
        action="store_true",
        help="Create symlinks for versioned macOS bundle",
    )

    add_args_for_signing_context(parser)

    return parser


def _get_codesigned_paths_for_spec_item(
    bundle_path: CodesignedPath,
    args: argparse.Namespace,
    item: BundleSpecItem,
    codesign_configuration: Optional[CodesignConfiguration],
) -> List[CodesignedPath]:
    if not item.codesign_on_copy:
        return []

    entitlements = (
        Path(item.codesign_entitlements) if item.codesign_entitlements else None
    )
    flags = (
        item.codesign_flags_override
        if (item.codesign_flags_override is not None)
        else args.codesign_args
    )

    codesigned_paths = []
    extra_file_paths = []

    is_dry_run = codesign_configuration is CodesignConfiguration.dryRun
    extra_codesign_paths = item.extra_codesign_paths or []
    for extra_codesign_path in extra_codesign_paths:
        path = bundle_path.path / item.dst / extra_codesign_path
        if not path.exists():
            raise RuntimeError(
                f"Found non-existing extra path to codesign: {extra_codesign_path} for {item.src}"
            )

        if path.is_file() and is_dry_run:
            # In dry-run mode, non-bundle items should be signed as part of the containing bundle.
            extra_file_paths.append(extra_codesign_path)
        else:
            codesigned_paths.append(
                CodesignedPath(
                    path=path,
                    entitlements=entitlements,
                    flags=flags,
                    extra_file_paths=None,
                )
            )

    codesigned_paths.append(
        CodesignedPath(
            path=bundle_path.path / item.dst,
            entitlements=entitlements,
            flags=flags,
            extra_file_paths=extra_file_paths,
        )
    )

    return codesigned_paths


def _get_codesigned_paths_from_spec(
    bundle_path: CodesignedPath,
    args: argparse.Namespace,
    spec: List[BundleSpecItem],
) -> List[CodesignedPath]:
    codesigned_paths = []
    for item in spec:
        codesigned_paths += _get_codesigned_paths_for_spec_item(
            bundle_path=bundle_path,
            args=args,
            item=item,
            codesign_configuration=args.codesign_configuration,
        )
    return codesigned_paths


def _main() -> None:
    args_parser = _args_parser()
    args = args_parser.parse_args()

    uname_info = os.uname()
    sysname = uname_info.sysname
    major_release = int(uname_info.release.split(".")[0])

    # macOS 26.0.0 Tahoe no longer supports SHA1 as an option in the codesign tool.
    # Note: This is safe to remove along with the code that adds the digest-algorithm code
    # in apple_bundle_wrapping_rule.bzl once we are migrated to macOS 26.0.
    if (
        "--digest-algorithm=sha1" in args.codesign_args
        and sysname == "Darwin"
        and major_release
        >= 25  # Darwin kernel release 25.0.0 corresponds to macOS 26.0.0 Tahoe
    ):
        args.codesign_args.remove("--digest-algorithm=sha1")

    if args.log_file:
        with open(args.log_file, "w") as _:
            # We need to open the log file for two reasons:
            # - Ensure it exists after action runs, as it's an output and thus required
            # - It gets erased, so that we get new logs when doing incremental bundling
            pass

    _setup_logging(
        stderr_level=getattr(logging, args.log_level_stderr.upper()),
        file_level=getattr(logging, args.log_level_file.upper()),
        log_path=args.log_file,
    )

    pr = cProfile.Profile()
    profiling_enabled = args.profile_output is not None
    if profiling_enabled:
        pr.enable()

    signing_context, selected_identity_argument = (
        signing_context_and_selected_identity_from_args(args)
    )

    with args.spec.open(mode="rb") as spec_file:
        spec = json.load(spec_file, object_hook=lambda d: BundleSpecItem(**d))
        spec = _deduplicate_spec(spec)

    incremental_context = _incremental_context(
        incremenatal_state_path=args.incremental_state,
        codesigned=args.codesign,
        codesign_configuration=args.codesign_configuration,
        codesign_identity=selected_identity_argument,
        codesign_arguments=args.codesign_args,
        versioned_if_macos=args.versioned_if_macos,
    )

    incremental_state = assemble_bundle(
        spec=spec,
        bundle_path=args.output,
        incremental_context=incremental_context,
        check_conflicts=args.check_conflicts,
        versioned_if_macos=args.versioned_if_macos,
    )

    swift_support_args = _swift_support_arguments(
        args_parser,
        args,
    )

    if swift_support_args:
        swift_stdlib_paths = run_swift_stdlib_tool(
            bundle_path=args.output,
            args=swift_support_args,
        )
    else:
        swift_stdlib_paths = []

    if args.codesign:
        # Vendored frameworks/bundles could already be pre-signed, in which case,
        # re-signing them requires modifying them. On RE, the umask is such that
        # copied files (when constructing the bundle) are not writable.
        make_dir_recursively_writable(args.output)
        if signing_context is None:
            raise RuntimeError(
                "Expected signing context to be created before bundling is done if codesign is requested."
            )

        bundle_path = CodesignedPath(
            path=args.output,
            entitlements=args.entitlements,
            flags=args.codesign_args,
            extra_file_paths=None,
        )
        codesign_on_copy_paths = _get_codesigned_paths_from_spec(
            bundle_path=bundle_path, spec=spec, args=args
        ) + [
            CodesignedPath(
                path=bundle_path.path / path,
                entitlements=None,
                flags=args.codesign_args,
                extra_file_paths=None,
            )
            for path in swift_stdlib_paths
        ]

        codesign_bundle(
            bundle_path=bundle_path,
            signing_context=signing_context,
            platform=args.platform,
            codesign_on_copy_paths=codesign_on_copy_paths,
            codesign_tool=args.codesign_tool,
            codesign_configuration=args.codesign_configuration,
            codesign_manifest_path=args.codesign_manifest,
        )
    elif args.codesign_manifest:
        # Always write the codesign manifest file, even when unsigned
        write_empty_codesign_manifest(
            codesign_manifest_path=args.codesign_manifest,
            bundle_path=args.output,
        )

    if incremental_state:
        if incremental_context is None:
            raise RuntimeError(
                "Expected incremental context to be present when incremental state is non-null."
            )
        _write_incremental_state(
            spec=spec,
            items=incremental_state,
            path=args.incremental_state,
            codesigned=args.codesign,
            codesign_configuration=args.codesign_configuration,
            selected_codesign_identity=selected_identity_argument,
            codesign_arguments=args.codesign_args,
            swift_stdlib_paths=swift_stdlib_paths,
            versioned_if_macos=args.versioned_if_macos,
            incremental_context=incremental_context,
        )

    if profiling_enabled:
        pr.disable()
        with open(args.profile_output, "w") as s:
            sortby = pstats.SortKey.CUMULATIVE
            ps = pstats.Stats(pr, stream=s).sort_stats(sortby)
            ps.print_stats()


def _incremental_context(
    incremenatal_state_path: Optional[Path],
    codesigned: bool,
    codesign_configuration: CodesignConfiguration,
    codesign_identity: Optional[str],
    codesign_arguments: List[str],
    versioned_if_macos: bool,
) -> Optional[IncrementalContext]:
    action_metadata = action_metadata_if_present(_METADATA_PATH_KEY)
    if action_metadata is None:
        # Environment variable not set, running in non-incremental mode.
        return None
    # If there is no incremental state or we failed to parse it (maybe because of a format change)
    # do a clean (non-incremental) assemble right now but generate proper state for next run.
    incremental_state = (
        _read_incremental_state(incremenatal_state_path)
        if incremenatal_state_path
        else None
    )
    return IncrementalContext(
        metadata=action_metadata,
        state=incremental_state,
        codesigned=codesigned,
        codesign_configuration=codesign_configuration,
        codesign_identity=codesign_identity,
        codesign_arguments=codesign_arguments,
        versioned_if_macos=versioned_if_macos,
    )


def _read_incremental_state(path: Path) -> Optional[IncrementalState]:
    logging.getLogger(__name__).info(f"Will read incremental state from `{path}`.")
    if not path.exists():
        logging.getLogger(__name__).warning(
            f"File with incremental state doesn't exist at `{path}`."
        )
        return None
    try:
        with path.open() as f:
            return parse_incremental_state(f)
    except Exception:
        logging.getLogger(__name__).exception("Failed to read incremental state")
        return None
    finally:
        # If something goes wrong and we don't delete the file
        # we probably end up in faulty state where incremental state
        # doesn't match the output. Hence delete it early.
        path.unlink()


def _swift_support_arguments(
    parser: argparse.ArgumentParser,
    args: argparse.Namespace,
) -> Optional[SwiftSupportArguments]:
    if not args.swift_stdlib_command:
        return None
    if not args.binary_destination:
        parser.error(
            "Expected `--binary-destination` argument to be specified when `--swift-stdlib-command` is present."
        )
    if not args.appclips_destination:
        parser.error(
            "Expected `--appclips-destination` argument to be specified when `--swift-stdlib-command` is present."
        )
    if not args.frameworks_destination:
        parser.error(
            "Expected `--frameworks-destination` argument to be specified when `--swift-stdlib-command` is present."
        )
    if not args.extensionkit_extensions_destination:
        parser.error(
            "Expected `--extensionkit-extensions-destination` argument to be specified when `--swift-stdlib-command` is present."
        )
    if not args.plugins_destination:
        parser.error(
            "Expected `--plugins-destination` argument to be specified when `--swift-stdlib-command` is present."
        )
    if not args.platform:
        parser.error(
            "Expected `--platform` argument to be specified when `--swift-stdlib-command` is present."
        )
    if not args.sdk_root:
        parser.error(
            "Expected `--sdk-root` argument to be specified when `--swift-stdlib-command` is present."
        )
    return SwiftSupportArguments(
        swift_stdlib_command=args.swift_stdlib_command,
        binary_destination=args.binary_destination,
        appclips_destination=args.appclips_destination,
        frameworks_destination=args.frameworks_destination,
        extensionkit_extensions_destination=args.extensionkit_extensions_destination,
        plugins_destination=args.plugins_destination,
        platform=args.platform,
        sdk_root=args.sdk_root,
    )


def _write_incremental_state(
    spec: List[BundleSpecItem],
    items: List[IncrementalStateItem],
    path: Path,
    codesigned: bool,
    codesign_configuration: CodesignConfiguration,
    selected_codesign_identity: Optional[str],
    codesign_arguments: List[str],
    swift_stdlib_paths: List[Path],
    versioned_if_macos: bool,
    incremental_context: IncrementalContext,
) -> None:
    state = IncrementalState(
        items,
        codesigned=codesigned,
        codesign_configuration=codesign_configuration,
        codesigned_on_copy=[
            codesigned_on_copy_item(
                path=Path(i.dst),
                entitlements=(
                    Path(i.codesign_entitlements) if i.codesign_entitlements else None
                ),
                incremental_context=incremental_context,
                codesign_flags_override=i.codesign_flags_override,
                extra_codesign_paths=i.extra_codesign_paths,
            )
            for i in spec
            if i.codesign_on_copy
        ],
        codesign_identity=selected_codesign_identity,
        codesign_arguments=codesign_arguments,
        swift_stdlib_paths=swift_stdlib_paths,
        versioned_if_macos=versioned_if_macos,
    )
    path.touch()
    try:
        with path.open(mode="w") as f:
            json.dump(state, f, cls=IncrementalStateJSONEncoder)
    except Exception:
        path.unlink()
        raise


def _deduplicate_spec(spec: List[BundleSpecItem]) -> List[BundleSpecItem]:
    # It's possible to have the same spec multiple times as different
    # apple_resource() targets can refer to the _same_ resource file.
    #
    # On RE, we're not allowed to overwrite files, so prevent doing
    # identical file copies.
    #
    # Do not reorder spec items to achieve determinism.
    # Rely on the fact that `dict` preserves key order.
    deduplicated_spec = list(dict.fromkeys(spec))
    # Force same sorting as in Buck1 for `SourcePathWithAppleBundleDestination`
    # WARNING: This logic is tightly coupled with how spec filtering is done in `_filter_conflicting_paths` method during incremental bundling. Don't change unless you fully understand what is going on here.
    deduplicated_spec.sort()
    return deduplicated_spec


def _setup_logging(
    stderr_level: int, file_level: int, log_path: Optional[Path]
) -> None:
    stderr_handler = logging.StreamHandler()
    stderr_handler.setLevel(stderr_level)
    log_format = (
        "%(asctime)s - %(name)s - %(levelname)s - %(message)s (%(filename)s:%(lineno)d)"
    )
    stderr_handler.setFormatter(
        ColoredLogFormatter(log_format)
        if sys.stderr.isatty()
        else logging.Formatter(log_format)
    )

    handlers: List[logging.Handler] = [stderr_handler]

    if log_path:
        file_handler = logging.FileHandler(log_path, encoding="utf-8")
        file_handler.setFormatter(logging.Formatter(log_format))
        file_handler.setLevel(file_level)
        handlers.append(file_handler)

    logging.basicConfig(level=logging.DEBUG, handlers=handlers)


class ColoredLogFormatter(logging.Formatter):
    _colors: Dict[int, str] = {
        logging.DEBUG: "\x1b[m",
        logging.INFO: "\x1b[37m",
        logging.WARNING: "\x1b[33m",
        logging.ERROR: "\x1b[31m",
        logging.CRITICAL: "\x1b[1;31m",
    }
    _reset_color = "\x1b[0m"

    def __init__(self, text_format: str) -> None:
        self.text_format = text_format

    def format(self, record: logging.LogRecord) -> str:
        colored_format = (
            self._colors[record.levelno] + self.text_format + self._reset_color
        )
        formatter = logging.Formatter(colored_format)
        return formatter.format(record)


if __name__ == "__main__":
    _main()
